/*
 * Copyright (c) 2022. China Mobile (SuZhou) Software Technology Co.,Ltd. All rights reserved.
 * Lakehouse is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

package com.chinamobile.cmss.lakehouse.core.deploy.component.instance.yunikorn;

import com.chinamobile.cmss.lakehouse.common.kubernetes.K8sModelConstant;
import com.chinamobile.cmss.lakehouse.core.config.ComponentConfiguration;
import com.chinamobile.cmss.lakehouse.core.config.KubernetesConfiguration;
import com.chinamobile.cmss.lakehouse.core.deploy.component.descriptor.ComponentDescriptor;
import com.chinamobile.cmss.lakehouse.core.handler.K8sDeployHandler;
import com.chinamobile.cmss.lakehouse.core.handler.K8sServiceHandler;
import com.chinamobile.cmss.lakehouse.core.handler.NodePortHandler;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.kubernetes.client.custom.IntOrString;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1Deployment;
import io.kubernetes.client.openapi.models.V1ObjectMetaBuilder;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServicePort;
import io.kubernetes.client.openapi.models.V1ServiceSpec;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class YuniKornComponentDescriptor extends ComponentDescriptor {

    private static final int YUNIKORN_RPC_PORT = 9080;
    private static final int YUNIKORN_UI_PORT = 9889;
    private static final int YUNIKORN_PORT_COUNT = 2;

    private static final String YUNIKORN_WEB_IMAGE_NAME = "yunikorn-web";
    private static final String YUNIKORN_CONFIG_MAP_NAME = "yunikorn-configs";
    private static final String YUNIKORN_STATEFUL_SET_NAME = "yunikorn-sts";
    private static final String YUNIKORN_HEADLESS_SERVICE = "yunikorn-headless";
    private static final String YUNIKORN_NODEPORT_SERVICE = "yunikorn-nodeport";
    private static final String YUNIKORN_RPC_PORT_NAME = "rpc";
    private static final String YUNIKORN_UI_PORT_NAME = "ui";

    @Autowired
    protected K8sDeployHandler deployHandler;

    @Autowired
    protected NodePortHandler nodePortHandler;

    public Optional<V1ConfigMap> buildYunKornConfigMap(ComponentConfiguration config)
        throws IOException {
        V1ConfigMap configMap = parseConfigMap(KubernetesConfiguration.getYunikornYamlTemplate());

        // set name
        configMap.getMetadata().setName(YUNIKORN_CONFIG_MAP_NAME);

        // update label to "app: yunikorn" since hard-coded yunikorn api
        configMap.getMetadata().setLabels(K8sModelConstant.YUNIKORN_CONFIG_MAP_LABELS);

        return Optional.of(configMap);
    }

    @Override
    public Optional<List<V1ConfigMap>> buildConfigMap(ComponentConfiguration config)
        throws IOException {

        List<V1ConfigMap> configMapList = Lists.newArrayList();
        buildYunKornConfigMap(config).ifPresent(configMapList::add);
        return Optional.of(configMapList);
    }

    @Override
    public Optional<List<V1Service>> buildService(ComponentConfiguration config) throws IOException {
        V1Service headlessSvc = parseService(KubernetesConfiguration.getYunikornYamlTemplate());

        // set headless service name
        headlessSvc.getMetadata().setName(YUNIKORN_HEADLESS_SERVICE);

        List<V1Service> services = Lists.newArrayList();
        services.add(headlessSvc);

        if (KubernetesConfiguration.exposeServiceEnabled) {
            services.addAll(buildNodePortService());
        }

        return Optional.of(services);
    }

    private List<V1Service> buildNodePortService() {
        List<V1Service> nodePortServices = Lists.newArrayList();
        // acquire node ports, only for ipv4
        int numGroups = 1;
        int requiredPortCount = numGroups * YUNIKORN_PORT_COUNT;
        List<Integer> nodePorts =
            nodePortHandler.acquireNodePorts(K8sModelConstant.DEFAULT_NAMESPACE, requiredPortCount);

        for (int index = 0; index < numGroups; index = index + 2) {
            // set labels
            Map<String, String> metalLbSvcMap = Maps.newHashMap();
            metalLbSvcMap.putAll(K8sModelConstant.YUNIKORN_SERVICE_LABELS);

            // set ports
            List<V1ServicePort> ports = Lists.newArrayList();
            V1ServicePort rpcPort =
                new V1ServicePort().name(YUNIKORN_RPC_PORT_NAME).port(nodePorts.get(index))
                    .nodePort(nodePorts.get(index)).targetPort(new IntOrString(YUNIKORN_RPC_PORT));
            V1ServicePort uiPort =
                new V1ServicePort().name(YUNIKORN_UI_PORT_NAME).port(nodePorts.get(index + 1))
                    .nodePort(nodePorts.get(index + 1)).targetPort(new IntOrString(YUNIKORN_UI_PORT));
            ports.add(rpcPort);
            ports.add(uiPort);

            V1Service nodePortSvc = new V1Service().apiVersion(K8sModelConstant.API_VERSION)
                .kind(K8sModelConstant.SERVICE_KIND)
                .metadata(new V1ObjectMetaBuilder().withName(YUNIKORN_NODEPORT_SERVICE)
                    .withLabels(metalLbSvcMap).build())
                .spec(new V1ServiceSpec().ports(ports)
                    .selector(K8sModelConstant.YUNIKORN_SERVICE_LABELS)
                    .type(K8sModelConstant.NODE_PORT));

            nodePortServices.add(nodePortSvc);
        }

        return nodePortServices;
    }

    @Override
    public Optional<V1StatefulSet> buildStatefulSet(ComponentConfiguration config)
        throws IOException {
        int replica = config.getInteger(YuniKornComponentOptions.YUNIKORN_REPLICA);

        V1StatefulSet statefulSet = parseStatefulSet(KubernetesConfiguration.getYunikornYamlTemplate());

        // set replica
        statefulSet.getSpec().setReplicas(replica);

        // set headless service
        statefulSet.getSpec().setServiceName(YUNIKORN_HEADLESS_SERVICE);

        // update image
        statefulSet.getSpec().getTemplate().getSpec().getContainers().stream().forEach(c -> {
            if (YUNIKORN_WEB_IMAGE_NAME.equals(c.getName())) {
                c.image(KubernetesConfiguration.yunikornWebImage);
            } else {
                c.image(KubernetesConfiguration.yunikornSchedulerImage);
            }
        });

        // update node selector
        statefulSet.getSpec().getTemplate().getSpec().nodeSelector(KubernetesConfiguration.getNodeSelector());

        // update resource
        double memExtended = KubernetesConfiguration.yunikornHeap * KubernetesConfiguration.memoryOverRatio;
        updateStatefulSetResources(statefulSet, KubernetesConfiguration.cpuOverRatio,
            KubernetesConfiguration.containerVcore, memExtended);

        return Optional.of(statefulSet);
    }

    @Override
    public Optional<V1Deployment> buildDeployment(ComponentConfiguration config) {
        return Optional.empty();
    }

    @Override
    public String k8sInternalService(String namespace) {
        return K8sServiceHandler.componentDomainService(namespace, YUNIKORN_HEADLESS_SERVICE,
            YUNIKORN_RPC_PORT);
    }

    @Override
    public String k8sExternalService(String namespace) {
        return deployHandler.getExternalUrlFromService(namespace, YUNIKORN_NODEPORT_SERVICE, YUNIKORN_RPC_PORT_NAME);
    }

    public String k8sInternalUiService(String namespace) {
        return K8sServiceHandler.componentDomainService(namespace, YUNIKORN_HEADLESS_SERVICE,
            YUNIKORN_UI_PORT);
    }

    public String k8sExternalUiService(String namespace) {
        return deployHandler.getExternalUrlFromService(namespace, YUNIKORN_NODEPORT_SERVICE, YUNIKORN_UI_PORT_NAME);
    }

    public boolean isRunning(String namespace) {
        return deployHandler.checkComponentHealthStatus(namespace, YUNIKORN_STATEFUL_SET_NAME);
    }

    public boolean isCommonResourceExisting(String namespace) {
        return deployHandler.checkCommonResourceExisting(namespace, YUNIKORN_CONFIG_MAP_NAME,
            YUNIKORN_HEADLESS_SERVICE, YUNIKORN_STATEFUL_SET_NAME);
    }

}
